Skip to content

1. 前置介绍:为什么需要 Redis Stream?

在 Redis 5.0 之前,使用 Redis 实现消息队列主要有两种方式,但它们都有明显的缺陷:

实现方式描述缺点
PUB/SUB (发布/订阅)经典的发布订阅模式无法持久化。如果出现网络断开或 Redis 宕机,消息就会丢失。
List (LPUSH+BRPOP) / Sorted-Set基于列表或有序集合实现支持持久化,但不支持多播(一对多)、不支持分组消费

为了解决上述问题,Redis 在 5.0 版本引入了 Stream 数据结构。从功能上看,它是 Redis 对消息队列(MQ)的完善实现。

2. Redis Stream 核心功能

Redis Stream 提供了消息的持久化和主备复制功能,允许任何客户端访问任何时刻的数据,并能记住每个客户端的访问位置,保证消息不丢失。其主要功能包括:

  • 消息 ID 的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 消息的广播消费
  • 未完成消息的处理(PEL 机制)
  • 消息队列监控

3. 核心结构与概念解析

Redis Stream 的内部结构是一个消息链表,将所有加入的消息串起来,每个消息都有唯一的 ID 和内容。

Redis Stream 结构图

关键组件说明

  1. Stream Name: Redis 的 key,例如 mystream。在首次使用 XADD 指令追加消息时自动创建。

  2. Consumer Group (消费组): 使用 XGROUP CREATE 命令创建。一个消费组包含多个消费者 (Consumer),这些消费者之间是竞争关系(一条消息只能被组内一个消费者消费)。

  3. last_delivered_id (游标): 每个消费组维护一个 last_delivered_id。任意一个消费者读取了消息,该游标都会向前移动。

  4. pending_ids (PEL - Pending Entries List)这是核心机制。用于维护消费者未确认的消息 ID。

    • 记录了已被客户端读取但尚未 ACK (Acknowledge) 的消息。
    • 如果不 ACK,消息会一直停留在 PEL 中。
    • 一旦消息被 ACK,它会从 PEL 中移除。
    • 作用:确保客户端至少消费了一次消息,防止因网络故障导致消息丢失。

4. 常用命令实战

4.1 消息的添加、读取与删除

添加消息 (XADD)

  • * 表示由服务器自动生成消息 ID。
  • 后面跟随 key value 对。
bash
# 格式: XADD key ID field string [field string ...]
redis:0> xadd test_stream * areaName nanjing
"1715334202451-0"  # 返回生成的ID

获取长度 (XLEN)与范围查询 (XRANGE)

  • - 表示最小值, + 表示最大值。
bash
# 获取消息长度
redis:0> xlen test_stream
"1"

# 添加更多数据用于测试
redis:0> xadd test_stream * areaName shenzhen
"1715334483563-0"

# 查询所有消息
redis:0> xrange test_stream - +
1) 1) "1715334477467-0"
   2) 1) "areaName"
      2) "nanjing"
2) 1) "1715334483563-0"
   2) 1) "areaName"
      2) "shenzhen"

删除消息 (XDEL)

bash
redis:0> xdel test_stream 1715334202451-0
"1"

4.2 不使用消费组消费 (XREAD)

这种模式下,Stream 类似于普通的 List,所有客户端都可以读取同一条消息(类似广播)。

命令格式XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

  • count: 读取数量
  • milliseconds: 阻塞毫秒数(不设置则为非阻塞)
  • id: 起始 ID (0-0 代表从头开始)

使用技巧:每次读取后,应记录返回的最后一条消息 ID,下次读取时将其作为参数,实现断点续传。

bash
# 从头开始读取 1 条
redis:0> xread count 1 streams test_stream 0-0
1) 1) "test_stream"
   2) 1) 1) "1715334477467-0"
         2) 1) "areaName"
            2) "nanjing"

#以此条 ID 为起点继续读取
redis:0> xread count 1 streams test_stream 1715334477467-0
1) 1) "test_stream"
   2) 1) 1) "1715334483563-0"
         2) 1) "areaName"
            2) "shenzhen"

4.3 使用消费组消费 (重点)

创建消费组 (XGROUP CREATE)

  • 0-0:表示从头开始消费。
  • $:表示只消费新消息(忽略历史消息)。
bash
redis:0> xgroup create test_stream test_group_1 0-0
"OK"

监控 Stream 信息 (XINFO)

bash
# 查看 Stream 整体信息
redis:0> xinfo stream test_stream
 1) "length"
 2) "2"
 ...
 9) "groups"
10) "1"  # 消费组数量

# 查看消费组信息
redis:0> xinfo groups test_stream
1) 1) "name"
   2) "test_group_1"
   3) "consumers"
   4) "0"
   5) "pending" # 正在处理且未 ACK 的消息数
   6) "0"

消费组读取消息 (XREADGROUP)

读取后,消息 ID 会进入 PEL (Pending Entries List)。

命令格式XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key ID

  • >:表示从消费组的 last_delivered_id 后面开始读(即读取未消费过的新消息)。
bash
# 消费者 consumer_1 读取消息
redis:0> xreadgroup group test_group_1 consumer_1 count 1 streams test_stream >
1) 1) "test_stream"
   2) 1) 1) "1715334477467-0"
         2) 1) "areaName"
            2) "nanjing"

# 再次查看消费组状态
redis:0> xinfo groups test_stream
...
5) "pending"
6) "1"       # 发现有1条消息处于 pending 状态
...

确认消息 (XACK)

处理完消息后,必须发送 ACK,否则消息会一直滞留在 PEL 中。

bash
# 确认消息已处理
redis:0> xack test_stream test_group_1 1715334477467-0
"1"

# 再次查看,pending 归零
redis:0> xinfo groups test_stream
...
5) "pending"
6) "0"
...

4.4 处理未完成的消息 (XPENDING)

如果消费者崩溃,消息未 ACK,需要使用 XPENDING 查询并重新处理这些“孤儿”消息。

bash
# 添加并读取一条消息但不 ACK
redis:0> xadd test_stream * areaName dalian
"1715585337107-0"
redis:0> xreadgroup group test_group_1 consumer_1 count 1 streams test_stream >
...

# 查询消费组内的 Pending 消息详情
# 格式: XPENDING key group [start end count] [consumer]
redis:0> xpending test_stream test_group_1
1) "1"               # 未确认消息总数
2) "1715585337107-0" # 起始 ID
3) "1715585337107-0" # 结束 ID
4) 1) 1) "consumer_1"
      2) "1"         # 该消费者的 pending 数量

5. 深入理解:消息 ID 与时钟回拨

5.1 ID 结构

Redis Stream 的 ID 格式为 timestamp-sequence(例如 1715334202451-0):

  • timestamp:毫秒级时间戳(64位整型)。
  • sequence:该毫秒内的序列号(64位整型)。

5.2 解决时钟回拨问题

通常分布式 ID(如雪花算法)会面临服务器时钟回拨导致 ID 重复或乱序的问题。Redis Stream 采用了以下机制解决:

  1. 单调递增:Redis 保证生成的 ID 永远大于前一个 ID。
  2. 维护 latest_generated_id:Redis 会记录最后一个生成的 ID。
  3. 自动修正:如果检测到当前服务器时间戳 小于 latest_generated_id 的时间戳(即发生了时钟回拨),Redis 会保持时间戳不变,强制递增序列号

这种机制确保了即使服务器时间发生跳变,Stream 中的消息 ID 依然保持严格的单调递增特性。